package com.bw.app;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv  = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE test (\n" +
                "  `id` STRING,\n" +
                "  `ts` BIGINT,\n" +
                "  `vc` BIGINT\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'my_test_top_1',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

//        Table table = tableEnv.sqlQuery("select id,vc from test");
        Table table = tableEnv.sqlQuery("select id,sum(vc) from test group by id");


        tableEnv.executeSql("CREATE TABLE test1 (\n" +
                "  `i` STRING,\n" +
                "  `s` BIGINT,\n" +
                "  PRIMARY KEY (i) NOT ENFORCED \n" +
                ") WITH (\n" +
                "  'connector' = 'upsert-kafka',\n" +
                "  'topic' = 'my_test_top_2',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "  'key.format' = 'json',\n" +
                "  'value.format' = 'json'\n" +
                ")");

        table.insertInto("test1").execute();


    }
}
